{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create entry points to spark\n",
    "try:\n",
    "    sc.stop()\n",
    "except:\n",
    "    pass\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "sc=SparkContext()\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Aggregate functions\n",
    "Two aggregate functions:\n",
    "\n",
    "* `aggregate()`\n",
    "* `aggregateByKey()`\n",
    "\n",
    "### `aggregate(zeroValue, seqOp, combOp)`\n",
    "\n",
    "* **zeroValue** is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.\n",
    "* **seqOp** is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.\n",
    "* **combOp** is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.\n",
    "\n",
    "The code below calculates the total sum of squares for **mpg** and **disp** in data set **mtcars**."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Step 1: get some data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(mpg=21.0, disp=160.0),\n",
       " Row(mpg=21.0, disp=160.0),\n",
       " Row(mpg=22.8, disp=108.0),\n",
       " Row(mpg=21.4, disp=258.0),\n",
       " Row(mpg=18.7, disp=360.0)]"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mtcars_df = spark.read.csv('../../data/mtcars.csv', inferSchema=True, header=True).select(['mpg', 'disp'])\n",
    "mtcars_df.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Step 2: calculate averages of mgp and disp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "mpg mean =  20.090625000000003 ; disp mean =  230.721875\n"
     ]
    }
   ],
   "source": [
    "mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()\n",
    "disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()\n",
    "print('mpg mean = ', mpg_mean, '; ' \n",
    "      'disp mean = ', disp_mean)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Step 3: build **zeroValue, seqOp** and **combOp**\n",
    "\n",
    "We are calculating two TSS. We create a tuple to store two values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "zeroValue = (0, 0) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The **z** below refers to `zeroValue`. Its values get updated after every run. The **x** refers to an element in an RDD partition. In this case, both **z** and **x** have two values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `combOp` function simply aggrate all `zeroValues` into one. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Implement `aggregate()` function."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "mtcars_df.rdd.aggregate(zeroValue, seqOp, combOp)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `aggregateByKey(zeroValue, seqOp, combOp)`\n",
    "\n",
    "This function does similar things as `aggregate()`. The `aggregate()` aggregate all results to the very end, but aggregateByKey() merge results by key."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Import data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['sepal_length,sepal_width,petal_length,petal_width,species',\n",
       " '5.1,3.5,1.4,0.2,setosa']"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "iris_rdd = sc.textFile('../../data/iris.csv', use_unicode=True)\n",
    "iris_rdd.take(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Transform data to a tuple RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('setosa', [5.1, 3.5, 1.4, 0.2]),\n",
       " ('setosa', [4.9, 3.0, 1.4, 0.2]),\n",
       " ('setosa', [4.7, 3.2, 1.3, 0.2]),\n",
       " ('setosa', [4.6, 3.1, 1.5, 0.2]),\n",
       " ('setosa', [5.0, 3.6, 1.4, 0.2])]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "iris_rdd_2 = iris_rdd.map(lambda x: x.split(',')).\\\n",
    "    filter(lambda x: x[0] != 'sepal_length').\\\n",
    "    map(lambda x: (x[-1], [*map(float, x[:-1])]))\n",
    "iris_rdd_2.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Define initial values, seqOp and combOp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "zero_value = (0, 0)\n",
    "seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2))\n",
    "combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Implement `aggregateByKey()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('setosa', (1259.0899999999997, 591.2500000000002)),\n",
       " ('versicolor', (1774.8600000000001, 388.47)),\n",
       " ('virginica', (2189.9000000000005, 447.33))]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "iris_rdd_2.aggregateByKey(zero_value, seqOp, combOp).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
